Skip to content

Conversation

@vjsingh1984
Copy link
Contributor

@vjsingh1984 vjsingh1984 commented Feb 9, 2026

Fixes #396 - Prevents spurious retry attempts after a subscription completes (receives EndOfStream).

Problem

After a subscription completes (e.g., OpenOrderEnd message), stray messages remaining in the message bus queue triggered unnecessary retry attempts. This caused:

  • Warning spam: "retrying after unexpected response" logged up to 10 times
  • Wasted CPU cycles: Up to 10 decode attempts per stray message
  • Misleading logs: Users saw warnings even though subscription completed successfully

Solution

Track stream completion state with stream_ended: Arc<AtomicBool> field to prevent retries after EndOfStream:

pub struct Subscription<T> {
    stream_ended: Arc<AtomicBool>,  // Persists across async calls
    // ...
}

ProcessingResult::EndOfStream => {
    self.stream_ended.store(true, Ordering::Release);
    return None;
}
ProcessingResult::Retry => {
    // Stop retrying if stream has already ended
    if self.stream_ended.load(Ordering::Acquire) {
        return None;  // ✅ Blocks retries after completion
    }
    // ... normal retry logic
}

Key improvement: The stream_ended flag is stored as a struct field (not a local variable), so it persists across async next() calls. This prevents retries even when stray messages arrive in the shared message bus queue after the subscription has completed.

Changes

  • Added stream_ended: Arc<AtomicBool> field to Subscription<T> struct
  • Initialize to false in all constructors
  • Update Clone implementation to include the field
  • Set to true on EndOfStream, cancel(), and drop()
  • Check the field before retrying on UnexpectedResponse

Testing

All existing tests pass. The fix is internal state management with no API changes.

Note

This fix addresses spurious retries for single subscription instances. For applications that create multiple subscriptions (e.g., polling open_orders() every 30 seconds), additional caching may be needed at the application layer to prevent creating duplicate subscriptions on the shared message bus.

vjsingh1984 and others added 6 commits February 7, 2026 19:33
Optionally disables Nagle's algorithm on TcpStream when the environment
variable IBAPI_TCP_NODELAY=1 is set. Default behavior is unchanged
(Nagle enabled, matching upstream).

When enabled, small writes (order submissions ~100-200 bytes) are sent
immediately instead of being buffered up to 40ms. For trading systems
this eliminates latency on order routing with zero practical downside.

Usage:
  IBAPI_TCP_NODELAY=1 ./my-trading-app

Affected paths:
- sync: TcpSocket::new() and TcpSocket::reconnect()
- async: AsyncConnection::connect_with_callback() and reconnect()
Adds 10-minute bar size to the BarSize enum for historical data fetching,
matching IBKR TWS/Gateway API support.

Changes:
- Add Min10 variant to BarSize enum (between Min5 and Min15)
- Display format: "10 mins"
- FromStr parsing: "MIN10" → BarSize::Min10
- Update tests to include Min10 coverage

This aligns with the official IBKR API specification which supports
10-minute bars for historical data requests.
IBKR returns bar dates as 'YYYYMMDD  HH:MM:SS' (with two spaces)
but the parser only handled:
1. Exactly 8 characters ('YYYYMMDD')
2. Unix timestamps

This caused parsing errors like 'the year component could not be parsed'
for many symbols during warmup.

Fix: Add support for parsing dates with time component in the
IBKR-specific format (two spaces between date and time).

Related: Live trading warmup failures on Windows
When a subscription completes normally (e.g., open_orders receives
OpenOrderEnd), stray messages in the message bus queue were causing
the subscription to retry decoding up to 10 times, logging warnings
for each retry attempt.

The fix tracks whether the stream has ended and stops retrying
UnexpectedResponse messages after EndOfStream, preventing these
spurious warnings and unnecessary retry loops.

Changes:
- Added stream_ended flag to track subscription state
- Check stream_ended before retrying on UnexpectedResponse
- Set stream_ended = true when EndOfStream is received
- Return immediately if retry is attempted after stream ended

This fixes the "retrying after unexpected response" warnings that
appeared every 2 minutes after open orders fetch completed.

Fixes: vjsingh1984/ibkrtrading issue with IBKR subscription warnings

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Sync with upstream 2.8.0 release which includes:
- ConnectionOptions with TCP_NODELAY support
- Min10 bar size for historical data
- update_display_group method
- Refactored connection API

Resolves merge conflicts in connection/async.rs and transport/sync.rs
by accepting upstream implementations (explicit parameters vs env vars).

Our fix for spurious retries (stream_ended flag) is preserved in
src/subscriptions/async.rs and remains functional.
- Add stream_ended field as Arc<AtomicBool> (follows cancelled pattern)
- Initialize in all constructors (with_decoder, new)
- Update Clone implementation to include stream_ended
- Replace local variable with self.stream_ended.load/store
- Set stream_ended=true on cancel and drop

This fixes the bug where stream_ended was a local variable that
reset on each call to next(), causing spurious retries to continue
after EndOfStream.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bug: Spurious retry attempts after subscription EndOfStream causes log spam

1 participant